Step jobs + waiting for children (with retries) not working as expected · Issue #2566 · OptimalBits/bull · GitHub 您所在的位置:网站首页 connect failed after 6 retries Step jobs + waiting for children (with retries) not working as expected · Issue #2566 · OptimalBits/bull · GitHub

Step jobs + waiting for children (with retries) not working as expected · Issue #2566 · OptimalBits/bull · GitHub

#Step jobs + waiting for children (with retries) not working as expected · Issue #2566 · OptimalBits/bull · GitHub| 来源: 网络整理| 查看: 265

Description

Hello! Thanks for your hard work on this lib.

We have been migrating our tasks handling to Bull, and now upgraded to BullMQ. A big benefit we see, is the possibility to create child jobs at runtime, child jobs can be taken by other workers in parallel.

After a lot of tries, I've found that the waiting for children feature might not be working as expected (or the example is wrong, or I'm doing wrong :) )

The expected behavior, if I understood this correctly, is that the parent job will be moved to waiting with moveToWaitingChildren, and won't be taken for processing until the children have been completed. In our case, it is resuming the execution before that happens, with some lock errors. I saw a few issues about this Error: Lock mismatch for job ... , but should not be related to the fact that the job is taken even if children did not finish.

Below I left a working example, with the logs I am getting back from it.

Minimal, Working Test code to reproduce the issue. import { Job, Queue, Worker } from 'bullmq'; const connection = config.redis; const parentQueue = new Queue('parentQueue', { connection, }); const childQueue = new Queue('childQueue', { connection, }); const childWorker = new Worker( childQueue.name, async (job: Job) => { console.log('start child processor'); console.log('child attempts: ', job.attemptsMade); // throw to test attempts throw new Error('failing child'); }, { connection }, ); childWorker.on('error', (err) => { // log the error console.error('child error', err); }); enum Step { Initial, Second, Finish, } const parentWorker = new Worker( parentQueue.name, async (job: Job, token) => { console.log('start parent processor'); console.log('parent attempts: ', job.attemptsMade); let step = job.data.step || Step.Initial; while (step !== Step.Finish) { switch (step) { case Step.Initial: { console.log({ step }); await childQueue.add( `child-${Math.floor(Math.random() * 1000) + 1}`, { foo: 'bar' }, { parent: { id: job.id || '', // Why is id optional in job but not in parent's job id? queue: job.queueQualifiedName, }, attempts: 20, backoff: { type: 'fixed', delay: 100, }, }, ); await job.update({ step: Step.Second, }); step = Step.Second; break; } case Step.Second: { console.log({ step }); const shouldWait = await job.moveToWaitingChildren(token || ''); // Why processor's token is optional but moveToWaitingChildren is not? console.log({ shouldWait }); if (shouldWait) { return; } // We should reach this point only when child-1 finished or failed it's 20 attempts right? console.log('continue after children finished / failed (all retries)'); await job.update({ step: Step.Finish, }); step = Step.Finish; return Step.Finish; } default: { throw new Error('invalid step'); } } } }, { connection }, ); parentWorker.on('error', (err) => { // log the error console.error('parent error', err); }); Logs start parent processor parent attempts: 1 { step: 0 } start child processor child attempts: 1 { step: 1 } { shouldWait: true } parent error Error: Missing lock for job 6. failed start child processor child attempts: 2 start child processor child attempts: 3 start parent processor --> **Child attempts is 3 and starts parent processor again** parent attempts: 2 { step: 1 } continue after children finished / failed (all retries) --> **Note, why it did not log `shouldWait` before continuing? :O **

other log sample:

start parent processor parent attempts: 1 { step: 0 } start child processor child attempts: 1 { step: 1 } parent error Error: Missing lock for job 15. failed start child processor child attempts: 2 start child processor child attempts: 3 start child processor child attempts: 4 start child processor child attempts: 5 ---> **In this case it got up to attempt 5** start parent processor parent attempts: 2 { step: 1 } { shouldWait: false } ---> **In this case it did print the should wait...** continue after children finished / failed (all retries)

I switched from

if (shouldWait) { return; } ...

to

if (!shouldWait) { // We should reach this point only when child-1 finished or failed it's 20 attempts right? console.log('continue after children finished / failed (all retries)'); await job.update({ step: Step.Finish, }); step = Step.Finish; return Step.Finish; } else { return; }

and now seems to be just stuck (not sure if there is a truthy check and type is wrong?)

start parent processor parent attempts: 1 { step: 0 } start child processor child attempts: 1 { step: 1 } parent error Error: Missing lock for job 44. failed start child processor child attempts: 2 start child processor child attempts: 3

another thing I noticed, is that when I call the endpoint (created a test endpoint just to test this, which returns the created job), sometimes it does not start executing the job, returns a new job but does not execute it.

Bull version

"bullmq": "^3.10.1",

Additional information


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有